Timeseries Loading
This article describes how to load timeseries from a rule.
The TimeseriesService is in charge of all timeseries retrieval operations internally and an instance of it is also available under AbstractRule.timeseries_service. Currently, this class has two different methods for retrieving timeseries data, which are explained further below. Each method returns an instance of the TimeseriesData class (also explained further down); a container for the timeseries data that was retrieved.
Timeseries Service Methods
get_latest
def get_latest(
self,
datasource_id: str,
classifiers: list[str],
date_range: DateRange | tuple[DateRange.InputTimestamp, DateRange.InputTimestamp],
*,
channel_family: str | None = None,
include_annotations: bool = False,
use_global_cache: bool = False,
) -> TimeseriesData:
"""
Retrieves the latest version of the timeseries data for the requested `classifiers` for the provided
`date_range` from the datasource with `datasource_id` and returns it as a :class:`TimeseriesData` object.
Args:
datasource_id: The ID of the datasource to retrieve data from.
classifiers: The channel classifiers to retrieve timeseries data for.
date_range: The inclusive date range to retrieve timeseries data for. Can also be provided as a tuple
of (start, end).
channel_family: The channel family to use for retrieving the data. If *None*, no channel family
filtering will be used.
include_annotations: Whether to also retrieve annotations for requested `classifiers`.
use_global_cache: Whether to store/retrieve requested timeseries in/from a global cache available to
all flows. Set this to *True* if the requested data is needed many times, like profile or weather
data.
"""
The TimeseriesService.get_latest method is the standard method to use for retrieving timeseries and is the new equivalent of the AbstractRule.load_side_input and AbstractRule.load_timeseries methods. It retrieves the latest version of each timestamp in the given date_range for the requested datasource_id and channel classifiers. Optionally, one can request annotations to be loaded for those channels as well with include_annotations. If one is requesting data that has to be requested very frequently (like profile or weather data), it can be beneficial to set the use_global_cache argument to True. This will store the retrieved data in a globally available cache as well, which is checked first when the data is requested again. This heavily reduces the overhead from requesting the same timeseries data over and over.
When requesting timeseries with this method, its results are also cached locally (and globally if use_global_cache is used). This means that if the same data (or a portion of it) is requested again, the data will be retrieved from the local cache instead of from our database. Therefore, unlike with the previous timeseries loading methods, it is more beneficial to always request your timeseries data with this method in future rules and not cache them yourself within the flow properties to use later. This also makes rules more independent.
Additionally, any data that was ingested or stored in a previous flow is automatically available within the cache, making this method be able to retrieve a version that is later than what could potentially be available in the database.
NOTE: As this method returns a TimeseriesData object instead of a pandas DataFrame (which AbstractRule.load_side_input and AbstractRule.load_timeseries return), the user can achieve the same output by writing AbstractRule.timeseries_service.get_latest().as_df() and passing the appropriate parameters to both method calls.
get_latest_by
def get_latest_by(
self,
datasource_id: str,
classifiers: list[str],
date_range: DateRange | tuple[DateRange.InputTimestamp, DateRange.InputTimestamp],
timestamp: pd.Timestamp | dt.datetime | str,
*,
channel_family: str | None = None,
include_annotations: bool = False,
) -> TimeseriesData:
"""
Retrieves the latest version by the given `timestamp` of the timeseries data for the requested
`classifiers` for the provided `date_range` from the datasource with `datasource_id` and returns it as a
:class:`TimeseriesData` object.
Args:
datasource_id: The ID of the datasource to retrieve data from.
classifiers: The channel classifiers to retrieve timeseries data for.
date_range: The inclusive date range to retrieve timeseries data for. Can also be provided as a tuple
of (start, end).
timestamp: The timeseries version retrieved will be the latest at this given timestamp.
channel_family: The channel family to use for retrieving the data. If *None*, no channel family
filtering will be used.
include_annotations: Whether to also retrieve annotations for requested `classifiers`.
"""
The TimeseriesService.get_latest_by method is a special version of the TimeseriesService.get_latest method, where instead of always retrieving the latest version of the requested channels, the method retrieves the latest version at the specified timestamp. In other words, it retrieves what the state of the channel was at the specified timestamp. Being able to retrieve timeseries this way can be useful when for example performing pricing, where you need to know what the state of a specific price curve was at a specific point in time.
Due to this method retrieving very specific versions instead of an undefined “latest” version, the local and the global cache are disabled for timeseries retrieved with this method.
get_versions
def get_versions(
self,
datasource_id: str,
classifiers: list[str],
date_range: DateRange | tuple[DateRange.InputTimestamp, DateRange.InputTimestamp],
*,
amount_of_versions: int = 2,
version_range: DateRange | tuple[DateRange.InputTimestamp, DateRange.InputTimestamp] | None = None,
channel_family: str | None = None,
include_annotations: bool = False,
) -> TimeseriesData:
"""
Retrieves the latest `amount_of_versions` versions (optionally within a given `version_range`) of the
timeseries data for the requested `classifiers` for the provided `date_range` from the datasource with
`datasource_id` and returns it as a :class:`TimeseriesData` object.
Args:
datasource_id: The ID of the datasource to retrieve data from.
classifiers: The channel classifiers to retrieve timeseries data for.
date_range: The inclusive date range to retrieve timeseries data for. Can also be provided as a tuple
of (start, end).
amount_of_versions: The maximum amount of versions that will be retrieved for the data. Defaults to 2.
version_range: If specified, only data whose version falls in this date range will be included. Can
also be provided as a tuple of (start, end).
channel_family: The channel family to use for retrieving the data.
include_annotations: Whether to also retrieve annotations for requested `classifiers`.
"""
The TimeseriesService.get_versions method is a new addition as of release 25.01. It retrieves at most the latest amount_of_versions versions of each timestamp in the given date_range for the requested datasource_id and channel classifiers. Annotations can be loaded for those channels as well with include_annotations. Additionally, when version_range is set, only versions that fall in this range will be retrieved.
For this method, global caching is not available. Local caching is available and any retrieved data using this method will automatically be stored in the local cache.
get_latest_datapoint
def get_latest_datapoint(
self,
datasource_id: str,
channel_family: str | None = None,
channel_classifiers: list[str] | None = None,
include_annotations: bool = False,
) -> TimeseriesData:
"""
Retrieves the latest datapoint of the timeseries data for the requested `channel_classifiers` from the
datasource with `datasource_id` and returns it as a :class:`TimeseriesData` object.
Args:
datasource_id: The ID of the datasource to retrieve data from.
channel_family: The channel family to use for retrieving the data. If *None*, no channel family filtering
will be used.
channel_classifiers: The channel classifiers to retrieve timeseries data for.
include_annotations: Whether to also retrieve annotations for requested `classifiers`.
"""
The TimeseriesService.get_latest_datapoint method is a new addition as of release 26.02. It retrieves the latest timestamp for the requested datasource_id and channel classifiers. Annotations can be loaded for those channels as well with include_annotations.
For this method, global caching is not available. Local caching is available and any retrieved data using this method will automatically be stored in the local cache.
Features
The following is a list of characteristics that the TimeseriesService features:
- Timeseries data can be loaded from ANY datasource, even those that were not prepared in
AbstractRule.prepare_context. In addition, the timeseries to be loaded do not require any preparation either. - Timestamps to retrieve timeseries data for can be provided as pandas Timestamps; datetimes; strings; and even just None (which implies no lower/upper bound).
- Timestamps provided can have any timezone. The
TimeseriesDatathat is returned will also represent the data using this timezone. - Timeseries data can be retrieved for either the latest version (
TimeseriesService.get_latest) or the latest version at a specific timestamp (TimeseriesService.get_latest_by). The latter allows one to retrieve the state of a channel at a specific point in time, which is very important for the process of billing and pricing (as pricing curves change every few hours). - Timeseries retrieved is automatically cached. If the same timeseries is requested again, the cache will be used first. It is therefore more beneficial to always request the timeseries data you need from the
TimeseriesServiceinstead of passing it around within the flow properties. This also allows for more independent rule design. - The aforementioned cache is used by ALL internal processes that require the loading of timeseries data, which improves the efficiency of flows.
- Any ingested data or data stored within the previous flow (if this is a chained flow) is taken into consideration as well when requesting the latest version of the timeseries data.
- When requesting the latest version of the timeseries data with
TimeseriesService.get_latest, one can also request that this data is temporarily stored in a global cache available to ALL flows. This allows other, unrelated flows to access that data much quicker if this data is needed very frequently.
TimeseriesData
The TimeseriesData is in charge of storing timeseries data that has been retrieved by the TimeseriesService and the details of the retrieval request. This is a class and not simply a pandas DataFrame, as the former gives us free reign over how we want to store this data internally. It also allows for multiple different formats to be requested whenever the user actually wants to use the retrieved timeseries data.
Attributes
The TimeseriesData object stores details about the request that was given to TimeseriesService whose result was the timeseries data stored in this object. These details are available under the following attributes:
channel_family: The channel family that was retrieved.date_range: The date range for which data was requested. This date range can potentially be larger than the actual data date range in the object, meaning that no data exists for the full range.latest_by_timestamp: The timestamp at which the data in this object is at its latest version. IfTimeseriesService.get_latest_bywas used, this is the value of thetimestampargument. ForTimeseriesService.get_latest, this is None.versions: Dict of channels and pandasDatetimeIndexthat lists all version timestamps that were retrieved for each specific channel to satisfy the request. This dict can be used to verify that the appropriate versions were retrieved if you so wish. This can especially be useful when using theTimeseriesService.get_latest_bymethod.
Methods
as_df
def as_df(
self,
*,
annotation_filter: Iterable[str] | None = None,
multiple_versions: bool = False
) -> pd.DataFrame:
"""
Returns the data stored in this :class:`TimeseriesData` object as a :class:`pd.DataFrame` object.
Args:
annotation_filter: If not *None*, only annotations (if any) are returned that have a name that is in
the provided list. Can be formatted either as '<channel_name>:<annotation_name>' (to make filter
specific to a channel) or '<annotation_name>' (to make filter apply to ALL channels).
multiple_versions: Specifies whether the function should return a multi index dataframe. If not
specified the multi index is flattened to a single index and only the latest version of the
available data is returned.
"""
The TimeseriesData.as_df method returns the timeseries data stored within this TimeseriesData object as a pandas DataFrame. This output is the same as the AbstractRule.load_side_input and AbstractRule.load_timeseries methods used to return. It is also possible to provide an annotation filter that allows for only specific annotations to be present in the returned dataframe. Annotation filtering can be done both on a specific channel/annotation combination or on just annotation name.
As of release 25.01, an extra argument multiple_versions has been added. If set to True, all available versions of the data are returned. If set to False, only the latest version of the data is returned as a single-index DataFrame, even when multiple versions are available.
NOTE : When setting multiple_versions to True, the result will be a multi-index DataFrame, where the first index is the timestamp and the second index is the version.
slice_by_version
def slice_by_version( self, version: str | DateRange.InputTimestamp | None = None ) -> dict[DateRange.InputTimestamp, "TimeseriesData"]: """ Slice the data by a specific version. If a version is supplied, a TimeseriesData object for that version is returned. If not, a TimeseriesData object for every version is returned. If the dataframe is empty, the specified version does not exist, an empty dictionary is returned.
Args: version: Version timestamp to slice the data by """
The TimeseriesData.slice_by_version method is available as of release 25.01 and returns a dictionary with a version timestamp and a TimeseriesData object containing only that specific version. If a version is supplied, only one TimeseriesData object for that version is returned. Otherwise, it will return a TimeseriesData object for every version in the DataFrame.
If a version is supplied that does not exist in the data or the DataFrame in the TimeseriesData object doesn't contain any data, an empty dictionary is returned.
Features
- The timezone that was given to the
TimeseriesServicemethod that created this object is also stored. Any method that returns the timeseries data to the user will present this data using that timezone. - Annotation filtering can be properly applied to the stored timeseries data (see signature of
TimeseriesData.as_df). Additionally, one can filter both on channel-specific annotations or just annotation names in general.
Writing unit tests
When writing unit tests for rules that use the TimeseriesService, you want to initialize your rules with a mocked instance of the TimeseriesService, plus mocking the output of either TimeseriesService.get_latest or TimeseriesService.get_latest_by to return TimeseriesData objects. In order to make this easier, TimeseriesData has a utility method called TimeseriesData.from_df which creates a TimeseriesData object from a pandas DataFrame. This allows you to create pandas DataFrames within your unit tests with your test data; directly convert them to TimeseriesData objects; and let that be returned from TimeseriesService.
So, for example, your unit test would look something like this (with the appropriate import for the required class):
import pandas as pd
from my_rule import MyRule
from energyworx_public.domain import TimeseriesData
class TestMyRule:
def test_rule(self, mocker):
rule = MyRule(timeseries_service=mocker.MagicMock())
df = pd.DataFrame(index=pd.date_range("2021-01-01T00:00:00Z", periods=1, freq='15T'),
data=`{'PRIMARY_CHANNEL': 1}`)
rule.timeseries_service.get_latest = mocker.MagicMock(side_effect=[TimeseriesData.from_df(df)])
rule.apply()